package com.kafka.demo.consumer.cluster;

import com.kafka.demo.cons.KafkaTopic;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

/**
 * 集群消费者-手动提交
 */
@Component
@Slf4j
public class AckCommitConsumer1 {


    @KafkaListener(
            id = "cluster-consumer1",
            topics = KafkaTopic.MY_TOPIC,
            group = "${kafka-consumer.services.cluster-consumer1.group-id}",
            containerFactory = "clusterListenerFactory1"
    )
    public void listen(ConsumerRecord<String, String> record, Acknowledgment ack) {
        log.info("收到消息: Partition={}, Offset={}, Key={}, Value{}",
                record.partition(),
                record.offset(),
                record.key(),
                record.value());

        // 手动提交 offset
        ack.acknowledge();
    }
}